CAMEL-8055 Added the atmos component implementation
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7d292021 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7d292021 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7d292021 Branch: refs/heads/camel-atmos Commit: 7d2920217b9bcc91bc2eaaab21cd37a52502f876 Parents: e6f9f02 Author: Alessio Cervellin <alessio.cervel...@3bitcom.com> Authored: Tue Nov 18 19:36:22 2014 +0100 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Wed Nov 19 15:44:50 2014 +0800 ---------------------------------------------------------------------- components/camel-atmos/pom.xml | 116 ++++++++ .../camel/component/atmos/AtmosComponent.java | 75 ++++++ .../component/atmos/AtmosConfiguration.java | 160 +++++++++++ .../camel/component/atmos/AtmosEndpoint.java | 99 +++++++ .../component/atmos/core/AtmosAPIFacade.java | 269 +++++++++++++++++++ .../component/atmos/dto/AtmosDelResult.java | 35 +++ .../atmos/dto/AtmosFileDownloadResult.java | 59 ++++ .../atmos/dto/AtmosFileUploadResult.java | 59 ++++ .../component/atmos/dto/AtmosMoveResult.java | 36 +++ .../camel/component/atmos/dto/AtmosResult.java | 40 +++ .../consumer/AtmosScheduledPollConsumer.java | 72 +++++ .../consumer/AtmosScheduledPollGetConsumer.java | 56 ++++ .../integration/producer/AtmosDelProducer.java | 40 +++ .../integration/producer/AtmosGetProducer.java | 43 +++ .../integration/producer/AtmosMoveProducer.java | 39 +++ .../integration/producer/AtmosProducer.java | 59 ++++ .../integration/producer/AtmosPutProducer.java | 43 +++ .../component/atmos/util/AtmosConstants.java | 26 ++ .../component/atmos/util/AtmosException.java | 30 +++ .../component/atmos/util/AtmosOperation.java | 38 +++ .../atmos/util/AtmosPropertyManager.java | 62 +++++ .../component/atmos/util/AtmosResultCode.java | 22 ++ .../component/atmos/util/AtmosResultHeader.java | 22 ++ .../validator/AtmosConfigurationValidator.java | 128 +++++++++ .../src/main/resources/META-INF/LICENSE.txt | 203 ++++++++++++++ .../src/main/resources/META-INF/NOTICE.txt | 11 + .../services/org/apache/camel/component/atmos | 18 ++ .../src/main/resources/atmos.properties | 21 ++ .../component/atmos/AtmosConsumerTest.java | 53 ++++ .../atmos/integration/AtmosTestSupport.java | 55 ++++ .../consumer/AtmosConsumerGetSingleTest.java | 59 ++++ .../producer/AtmosProducerDelTest.java | 66 +++++ .../producer/AtmosProducerGetFolderTest.java | 69 +++++ .../producer/AtmosProducerGetSingleTest.java | 67 +++++ .../producer/AtmosProducerMoveTest.java | 67 +++++ ...tmosProducerPutFolderWithRemotePathTest.java | 67 +++++ .../AtmosProducerPutSingleFileTest.java | 67 +++++ ...ProducerPutSingleFileWithRemotePathTest.java | 67 +++++ .../src/test/resources/log4j.properties | 35 +++ .../src/test/resources/test-options.properties | 21 ++ components/pom.xml | 1 + 41 files changed, 2575 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-atmos/pom.xml b/components/camel-atmos/pom.xml new file mode 100644 index 0000000..c18de89 --- /dev/null +++ b/components/camel-atmos/pom.xml @@ -0,0 +1,116 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>2.15-SNAPSHOT</version> + </parent> + + <artifactId>camel-atmos</artifactId> + <packaging>bundle</packaging> + <name>Camel :: Atmos</name> + <description>Camel Atmos support</description> + + <properties> + <camel.osgi.export.pkg>org.apache.camel.component.atmos.*</camel.osgi.export.pkg> + <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=atmos</camel.osgi.export.service> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + + <!-- atmos --> + <dependency> + <groupId>com.emc.vipr</groupId> + <artifactId>atmos-client</artifactId> + <version>2.2.2</version> + </dependency> + + <!-- apache commons-io --> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>${commons-io-version}</version> + </dependency> + + <!-- testing --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Allows the routes to be run via 'mvn camel:run' --> + <plugin> + <groupId>org.apache.camel</groupId> + <artifactId>camel-maven-plugin</artifactId> + <version>${project.version}</version> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>no-integration-test</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <excludes> + <exclude> + **/integration/** + </exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/AtmosComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/AtmosComponent.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/AtmosComponent.java new file mode 100644 index 0000000..576c955 --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/AtmosComponent.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.camel.component.atmos; + +import java.util.Map; + +import org.apache.camel.Endpoint; +import org.apache.camel.component.atmos.util.AtmosOperation; +import org.apache.camel.component.atmos.util.AtmosPropertyManager; +import org.apache.camel.component.atmos.validator.AtmosConfigurationValidator; +import org.apache.camel.impl.DefaultComponent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AtmosComponent extends DefaultComponent { + + private static final transient Logger LOG = LoggerFactory.getLogger(AtmosComponent.class); + + /** + * Create a camel endpoint after passing validation on the incoming url. + * + * @param uri the full URI of the endpoint + * @param remaining the remaining part of the URI without the query + * parameters or component prefix + * @param parameters the optional parameters passed in + * @return the camel endpoint + * @throws Exception + */ + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + AtmosConfiguration configuration = new AtmosConfiguration(); + + // set options from component + configuration.setUri((String) parameters.get("uri") == null + ? AtmosPropertyManager.getInstance().getProperty("uri") + : (String) parameters.get("uri")); + configuration.setSecretKey((String) parameters.get("secretKey") == null + ? AtmosPropertyManager.getInstance().getProperty("secretKey") + : (String) parameters.get("secretKey")); + configuration.setLocalPath((String) parameters.get("localPath")); + configuration.setRemotePath((String) parameters.get("remotePath")); + configuration.setNewRemotePath((String) parameters.get("newRemotePath")); + configuration.setQuery((String) parameters.get("query")); + configuration.setOperation(AtmosOperation.valueOf(remaining)); + configuration.setFullTokenId(parameters.get("fullTokenId") == null + ? AtmosPropertyManager.getInstance().getProperty("fullTokenId") + : (String) parameters.get("fullTokenId")); + configuration.setEnableSslValidation(Boolean.parseBoolean((String) AtmosPropertyManager.getInstance().getProperty("sslValidation"))); + + //pass validation test + AtmosConfigurationValidator.validate(configuration); + + // and then override from parameters + setProperties(configuration, parameters); + + LOG.info("atmos configuration set!"); + + Endpoint endpoint = new AtmosEndpoint(uri, this, configuration); + return endpoint; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/AtmosConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/AtmosConfiguration.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/AtmosConfiguration.java new file mode 100644 index 0000000..43a7386 --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/AtmosConfiguration.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.camel.component.atmos; + +import com.emc.atmos.api.AtmosApi; +import com.emc.atmos.api.AtmosConfig; +import com.emc.atmos.api.jersey.AtmosApiClient; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.camel.component.atmos.util.AtmosException; +import org.apache.camel.component.atmos.util.AtmosOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AtmosConfiguration { + + private static final transient Logger LOG = LoggerFactory.getLogger(AtmosConfiguration.class); + + //atmos shared secret + private String secretKey; + //local path to put files + private String localPath; + //where to put files on atmos + private String remotePath; + //new path on atmos when moving files + private String newRemotePath; + //search query on atmos + private String query; + //atmos client fullTokenId + private String fullTokenId; + //atmos server uri + private String uri; + //atmos ssl validation + private boolean enableSslValidation; + //specific atmos operation for the component + private AtmosOperation operation; + //reference to atmo client + private AtmosApi client; + + public void setClient(AtmosApi client) { + this.client = client; + } + + public AtmosApi getClient() { + return client; + } + + /** + * Obtain a new instance of AtmosApi client and store it in configuration. + * + * @throws AtmosException + */ + public void createClient() throws AtmosException { + AtmosConfig config = null; + try { + config = new AtmosConfig(fullTokenId, secretKey, new URI(uri)); + } catch (URISyntaxException use) { + throw new AtmosException("wrong syntax for Atmos URI!", use); + } + if (config == null) { + throw new AtmosException("can't configure the Atmos client!"); + } + if (!enableSslValidation) { + config.setDisableSslValidation(true); + } + AtmosApi atmosclient = new AtmosApiClient(config); + if (atmosclient == null) { + throw new AtmosException("can't establish an Atmos connection!"); + } + this.client = atmosclient; + } + + public String getSecretKey() { + return secretKey; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + public String getLocalPath() { + return localPath; + } + + public void setLocalPath(String localPath) { + this.localPath = localPath; + } + + public String getRemotePath() { + return remotePath; + } + + public void setRemotePath(String remotePath) { + this.remotePath = remotePath; + } + + public String getNewRemotePath() { + return newRemotePath; + } + + public void setNewRemotePath(String newRemotePath) { + this.newRemotePath = newRemotePath; + } + + public String getQuery() { + return query; + } + + public void setQuery(String query) { + this.query = query; + } + + public String getFullTokenId() { + return fullTokenId; + } + + public void setFullTokenId(String fullTokenId) { + this.fullTokenId = fullTokenId; + } + + public AtmosOperation getOperation() { + return operation; + } + + public void setOperation(AtmosOperation operation) { + this.operation = operation; + } + + public void setUri(String uri) { + this.uri = uri; + } + + public String getUri() { + return uri; + } + + public boolean isEnableSslValidation() { + return enableSslValidation; + } + + public void setEnableSslValidation(boolean enableSslValidation) { + this.enableSslValidation = enableSslValidation; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/AtmosEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/AtmosEndpoint.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/AtmosEndpoint.java new file mode 100644 index 0000000..f272ab4 --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/AtmosEndpoint.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.camel.component.atmos; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.atmos.integration.consumer.AtmosScheduledPollConsumer; +import org.apache.camel.component.atmos.integration.consumer.AtmosScheduledPollGetConsumer; +import org.apache.camel.component.atmos.integration.producer.AtmosDelProducer; +import org.apache.camel.component.atmos.integration.producer.AtmosGetProducer; +import org.apache.camel.component.atmos.integration.producer.AtmosMoveProducer; +import org.apache.camel.component.atmos.integration.producer.AtmosPutProducer; +import org.apache.camel.component.atmos.util.AtmosException; +import org.apache.camel.component.atmos.util.AtmosOperation; +import org.apache.camel.impl.DefaultEndpoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.component.atmos.util.AtmosConstants.POLL_CONSUMER_DELAY; + +public class AtmosEndpoint extends DefaultEndpoint { + + private static final transient Logger LOG = LoggerFactory.getLogger(AtmosEndpoint.class); + + private AtmosConfiguration configuration; + + public AtmosEndpoint() { + } + + public AtmosEndpoint(String uri, AtmosComponent component, AtmosConfiguration configuration) { + super(uri, component); + this.configuration = configuration; + } + + public AtmosEndpoint(String endpointUri) { + super(endpointUri); + } + + /** + * Create one of the camel producer available based on the configuration + * + * @return the camel producer + * @throws Exception + */ + public Producer createProducer() throws Exception { + LOG.info("resolve producer atmos endpoint {" + configuration.getOperation().toString() + "}"); + LOG.info("resolve producer atmos attached client: " + configuration.getClient()); + if (configuration.getOperation() == AtmosOperation.put) { + return new AtmosPutProducer(this, configuration); + } else if (this.configuration.getOperation() == AtmosOperation.del) { + return new AtmosDelProducer(this, configuration); + } else if (this.configuration.getOperation() == AtmosOperation.get) { + return new AtmosGetProducer(this, configuration); + } else if (this.configuration.getOperation() == AtmosOperation.move) { + return new AtmosMoveProducer(this, configuration); + } else { + throw new AtmosException("operation specified is not valid for producer!"); + } + } + + /** + * Create one of the camel consumer available based on the configuration + * + * @param processor the given processor + * @return the camel consumer + * @throws Exception + */ + public Consumer createConsumer(Processor processor) throws Exception { + LOG.debug("resolve consumer atmos endpoint {" + configuration.getOperation().toString() + "}"); + LOG.debug("resolve consumer atmos attached client:" + configuration.getClient()); + AtmosScheduledPollConsumer consumer = null; + if (this.configuration.getOperation() == AtmosOperation.get) { + consumer = new AtmosScheduledPollGetConsumer(this, processor, configuration); + consumer.setDelay(POLL_CONSUMER_DELAY); + return consumer; + } else { + throw new AtmosException("operation specified is not valid for consumer!"); + } + } + + public boolean isSingleton() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/core/AtmosAPIFacade.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/core/AtmosAPIFacade.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/core/AtmosAPIFacade.java new file mode 100644 index 0000000..852193e --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/core/AtmosAPIFacade.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.camel.component.atmos.core; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import com.emc.atmos.api.AtmosApi; +import com.emc.atmos.api.ObjectId; +import com.emc.atmos.api.ObjectPath; +import com.emc.atmos.api.bean.DirectoryEntry; +import com.emc.atmos.api.request.CreateObjectRequest; +import com.emc.atmos.api.request.ListDirectoryRequest; +import org.apache.camel.component.atmos.dto.AtmosDelResult; +import org.apache.camel.component.atmos.dto.AtmosFileDownloadResult; +import org.apache.camel.component.atmos.dto.AtmosFileUploadResult; +import org.apache.camel.component.atmos.dto.AtmosMoveResult; +import org.apache.camel.component.atmos.dto.AtmosResult; +import org.apache.camel.component.atmos.util.AtmosException; +import org.apache.camel.component.atmos.util.AtmosResultCode; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.component.atmos.util.AtmosConstants.ATMOS_FILE_SEPARATOR; + +public final class AtmosAPIFacade { + + private static final transient Logger LOG = LoggerFactory.getLogger(AtmosAPIFacade.class); + + private static AtmosAPIFacade instance; + private static AtmosApi client; + + private AtmosAPIFacade() { + } + + /** + * Return a singleton instance of this class + * + * @param client the AtmosClient performing atmos low level operations + * @return the singleton instance of this class + */ + public static AtmosAPIFacade getInstance(AtmosApi client) { + if (instance == null) { + instance = new AtmosAPIFacade(); + AtmosAPIFacade.client = client; + } + return instance; + } + + /** + * Put or upload a new file or an entire directory to atmos + * + * @param localPath the file path or the dir path on the local filesystem + * @param remotePath the remote path destination on atmos + * the file already existing with the same name + * will be overridden. + * @return a AtmosResult object reporting for each remote path the result of + * the operation. + * @throws AtmosException + */ + public AtmosResult put(String localPath, String remotePath) throws AtmosException { + AtmosResult result = new AtmosFileUploadResult(); + //a map representing for each path the result of the put operation + Map<String, AtmosResultCode> resultEntries = null; + //in case the remote path is not specified, the remotePath = localPath + String atmosPath = remotePath == null ? localPath : remotePath; + if (!atmosPath.endsWith(ATMOS_FILE_SEPARATOR)) { + atmosPath += ATMOS_FILE_SEPARATOR; + } + ObjectPath atmosEntry = new ObjectPath(atmosPath); + + if (!atmosPath.equals(ATMOS_FILE_SEPARATOR)) { + if (AtmosAPIFacade.client.getSystemMetadata(atmosEntry) == null) { + throw new AtmosException(atmosPath + " does not exist or can't obtain metadata"); + } + } + + File fileLocalPath = new File(localPath); + //verify uploading of a single file + if (fileLocalPath.isFile()) { + //check if atmos file exists + if (atmosEntry != null && !atmosEntry.isDirectory()) { + throw new AtmosException(atmosPath + " exists on atmos and is not a folder!"); + } + atmosPath = atmosPath + fileLocalPath.getName(); + resultEntries = new HashMap<String, AtmosResultCode>(1); + try { + ObjectId uploadedFile = putSingleFile(fileLocalPath, atmosPath); + if (uploadedFile == null) { + resultEntries.put(atmosPath, AtmosResultCode.KO); + } else { + resultEntries.put(atmosPath, AtmosResultCode.OK); + } + + } catch (Exception ex) { + resultEntries.put(atmosPath, AtmosResultCode.KO); + } finally { + result.setResultEntries(resultEntries); + } + return result; + } else { //verify uploading of a list of files inside a dir + LOG.info("uploading a dir..."); + //check if atmos folder exists + if (atmosEntry != null && !atmosEntry.isDirectory()) { + throw new AtmosException(atmosPath + " exists on atmos and is not a folder!"); + } + //revert to old path + String oldAtmosPath = atmosPath; + //list all files in a dir + Collection<File> listFiles = FileUtils.listFiles(fileLocalPath, null, true); + if (listFiles == null || listFiles.isEmpty()) { + throw new AtmosException(localPath + " doesn't contain any files"); + } + resultEntries = new HashMap<String, AtmosResultCode>(listFiles.size()); + for (File file : listFiles) { + String absPath = file.getAbsolutePath(); + int indexRemainingPath = localPath.length(); + if (!localPath.endsWith("/")) { + indexRemainingPath += 1; + } + String remainingPath = absPath.substring(indexRemainingPath); + atmosPath = atmosPath + remainingPath; + try { + LOG.info("uploading:" + fileLocalPath + "," + atmosPath); + ObjectId uploadedFile = putSingleFile(file, atmosPath); + if (uploadedFile == null) { + resultEntries.put(atmosPath, AtmosResultCode.KO); + } else { + resultEntries.put(atmosPath, AtmosResultCode.OK); + } + } catch (Exception ex) { + resultEntries.put(atmosPath, AtmosResultCode.KO); + } + atmosPath = oldAtmosPath; + } + result.setResultEntries(resultEntries); + return result; + } + } + + private ObjectId putSingleFile(File inputFile, String atmosPath) throws Exception { + FileInputStream inputStream = new FileInputStream(inputFile); + ObjectId uploadedFile = null; + try { + ObjectPath op = new ObjectPath(atmosPath); + CreateObjectRequest request = new CreateObjectRequest(); + request.identifier(op).content(inputStream).contentLength(inputFile.length()); + uploadedFile = AtmosAPIFacade.client.createObject(request).getObjectId(); + return uploadedFile; + } finally { + inputStream.close(); + } + } + + /** + * Delete every files and subdirectories inside the remote directory. In + * case the remotePath is a file, delete the file. + * + * @param remotePath the remote location to delete + * @return a AtmosResult object with the result of the delete operation. + * @throws AtmosException + */ + public AtmosResult del(String remotePath) throws AtmosException { + AtmosResult result = null; + ObjectPath op = new ObjectPath(remotePath); + AtmosAPIFacade.client.delete(op); + result = new AtmosDelResult(); + result.setResultEntries(remotePath); + return result; + } + + /** + * Rename a remote path with the new path location. + * + * @param remotePath the existing remote path to be renamed + * @param newRemotePath the new remote path substituting the old one + * @return a AtmosResult object with the result of the move operation. + * @throws AtmosException + */ + public AtmosResult move(String remotePath, String newRemotePath) throws AtmosException { + AtmosResult result = null; + AtmosAPIFacade.client.move(new ObjectPath(remotePath), new ObjectPath(newRemotePath), true); + result = new AtmosMoveResult(); + result.setResultEntries(remotePath + "-" + newRemotePath); + return result; + } + + /** + * Get the content of every file inside the remote path. + * + * @param remotePath the remote path where to download from + * @return a AtmosResult object with the content (ByteArrayOutputStream) of + * every files inside the remote path. + * @throws AtmosException + */ + public AtmosResult get(String remotePath) throws AtmosException { + AtmosResult result = new AtmosFileDownloadResult(); + //a map representing for each path the result of the baos + Map<String, ByteArrayOutputStream> resultEntries = new HashMap<String, ByteArrayOutputStream>(); + //iterate from the remotePath + downloadFilesInFolder(remotePath, resultEntries); + //put the map of baos as result + result.setResultEntries(resultEntries); + return result; + } + + private void downloadFilesInFolder(String atmosPath, Map<String, ByteArrayOutputStream> resultEntries) throws AtmosException { + ObjectPath atmosEntry = new ObjectPath(atmosPath); + if (AtmosAPIFacade.client.getSystemMetadata(atmosEntry) == null) { + throw new AtmosException(atmosPath + " does not exist or can't obtain metadata"); + } + if (!atmosEntry.isDirectory()) { + LOG.info("downloading a single file..."); + downloadSingleFile(atmosPath, resultEntries); + return; + } + ListDirectoryRequest listRequest = new ListDirectoryRequest().path(atmosEntry); + AtmosAPIFacade.client.listDirectory(listRequest); + for (DirectoryEntry entry : AtmosAPIFacade.client.listDirectory(listRequest).getEntries()) { + if (!entry.isDirectory()) { + try { + //get the baos of the file + downloadSingleFile(atmosEntry.getPath().concat(entry.getFilename()), resultEntries); + } catch (AtmosException e) { + LOG.warn("can't download from " + entry.getFilename()); + } + } else { + //iterate on folder + downloadFilesInFolder(atmosEntry.getPath().concat(entry.getFilename()), resultEntries); + } + } + } + + private void downloadSingleFile(String path, Map<String, ByteArrayOutputStream> resultEntries) throws AtmosException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] content = null; + try { + content = AtmosAPIFacade.client.readObject(new ObjectPath(path), byte[].class); + baos.write(content); + } catch (IOException e) { + throw new AtmosException(path + " can't obtain a stream", e); + } + if (content != null) { + resultEntries.put(path, baos); + LOG.info("downloaded path:" + path + " - baos size:" + baos.size()); + } + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosDelResult.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosDelResult.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosDelResult.java new file mode 100644 index 0000000..bd9927e --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosDelResult.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.dto; + +import org.apache.camel.Exchange; +import org.apache.camel.component.atmos.util.AtmosResultHeader; + +public class AtmosDelResult extends AtmosResult { + + /** + * Object payload contained in Exchange + * Exchange Body is populated with the remote path deleted on atmos. + * @param exchange + */ + @Override + public void populateExchange(Exchange exchange) { + String remotePath = (String)resultEntries; + exchange.getIn().setHeader(AtmosResultHeader.DELETED_PATH.name(), remotePath); + exchange.getIn().setBody(remotePath); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosFileDownloadResult.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosFileDownloadResult.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosFileDownloadResult.java new file mode 100644 index 0000000..1b18264 --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosFileDownloadResult.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.dto; + +import java.io.ByteArrayOutputStream; +import java.util.Map; + +import org.apache.camel.Exchange; +import org.apache.camel.component.atmos.util.AtmosResultHeader; + +public class AtmosFileDownloadResult extends AtmosResult { + + /** + * Object payload contained in Exchange + * In case of a single file Exchange Header is populated with the name of the remote path downloaded + * In case of a multiple files Exchange Header is populated with the name of the remote paths downloaded + * In case of a single file Exchange Body is populated with the ByteArrayOutputStream downloaded from atmos. + * In case of multiple files Exchange Body is populated with a map containing as key the remote path + * and as value the linked ByteArrayOutputStream + * @param exchange + */ + @Override + public void populateExchange(Exchange exchange) { + //in case we have only one baos put it directly in body + Map<String, ByteArrayOutputStream> map = (Map<String, ByteArrayOutputStream>)resultEntries; + if (map.size() == 1) { + //set info in exchange + String pathExtracted = null; + ByteArrayOutputStream baosExtracted = null; + for (Map.Entry<String, ByteArrayOutputStream> entry : map.entrySet()) { + pathExtracted = entry.getKey(); + baosExtracted = entry.getValue(); + } + exchange.getIn().setHeader(AtmosResultHeader.DOWNLOADED_FILE.name(), pathExtracted); + exchange.getIn().setBody(baosExtracted); + } else { + StringBuffer pathsExtracted = new StringBuffer(); + for (Map.Entry<String, ByteArrayOutputStream> entry : map.entrySet()) { + pathsExtracted.append(entry.getKey() + "\n"); + } + exchange.getIn().setHeader(AtmosResultHeader.DOWNLOADED_FILES.name(), pathsExtracted.toString()); + exchange.getIn().setBody(map); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosFileUploadResult.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosFileUploadResult.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosFileUploadResult.java new file mode 100644 index 0000000..c7b4dcf --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosFileUploadResult.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.dto; + +import java.util.Map; + +import org.apache.camel.Exchange; +import org.apache.camel.component.atmos.util.AtmosResultCode; +import org.apache.camel.component.atmos.util.AtmosResultHeader; + + +public class AtmosFileUploadResult extends AtmosResult { + + /** + * Object payload contained in Exchange + * In case of a single file Exchange Header is populated with the name of the remote path uploaded + * In case of a multiple files Exchange Header is populated with the name of the remote paths uploaded + * In case of a single file Exchange Body is populated with the result code of the upload operation for the remote path. + * In case of multiple files Exchange Body is populated with a map containing as key the remote path uploaded + * and as value the result code of the upload operation + * @param exchange + */ + @Override + public void populateExchange(Exchange exchange) { + Map<String, AtmosResultCode> map = (Map<String, AtmosResultCode>)resultEntries; + if (map.size() == 1) { + //set info in exchange + String pathExtracted = null; + AtmosResultCode codeExtracted = null; + for (Map.Entry<String, AtmosResultCode> entry : map.entrySet()) { + pathExtracted = entry.getKey(); + codeExtracted = entry.getValue(); + } + exchange.getIn().setHeader(AtmosResultHeader.UPLOADED_FILE.name(), pathExtracted); + exchange.getIn().setBody(codeExtracted.name()); + } else { + StringBuffer pathsExtracted = new StringBuffer(); + for (Map.Entry<String, AtmosResultCode> entry : map.entrySet()) { + pathsExtracted.append(entry.getKey() + "\n"); + } + exchange.getIn().setHeader(AtmosResultHeader.UPLOADED_FILES.name(), pathsExtracted.toString()); + exchange.getIn().setBody(map); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosMoveResult.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosMoveResult.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosMoveResult.java new file mode 100644 index 0000000..e745da5 --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosMoveResult.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.dto; + +import org.apache.camel.Exchange; +import org.apache.camel.component.atmos.util.AtmosResultHeader; + + +public class AtmosMoveResult extends AtmosResult { + + /** + * Object payload contained in Exchange + * Exchange Header and Body contains the moved path + * @param exchange + */ + @Override + public void populateExchange(Exchange exchange) { + String movedPath = (String)resultEntries; + exchange.getIn().setHeader(AtmosResultHeader.MOVED_PATH.name(), movedPath); + exchange.getIn().setBody(movedPath); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosResult.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosResult.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosResult.java new file mode 100644 index 0000000..74e22d3 --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/dto/AtmosResult.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.dto; + +import org.apache.camel.Exchange; + + +public abstract class AtmosResult { + + protected Object resultEntries; + + /** + * Populate the camel exchange with the results from atmos method invocations. + * @param exchange + */ + public abstract void populateExchange(Exchange exchange); + + public Object getResultEntries() { + return resultEntries; + } + + public void setResultEntries(Object resultEntries) { + this.resultEntries = resultEntries; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollConsumer.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollConsumer.java new file mode 100644 index 0000000..24863b0 --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollConsumer.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.integration.consumer; + +import org.apache.camel.Processor; +import org.apache.camel.component.atmos.AtmosConfiguration; +import org.apache.camel.component.atmos.AtmosEndpoint; +import org.apache.camel.impl.ScheduledPollConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public abstract class AtmosScheduledPollConsumer extends ScheduledPollConsumer { + protected static final transient Logger LOG = LoggerFactory.getLogger(AtmosScheduledPollConsumer.class); + protected AtmosEndpoint endpoint; + protected AtmosConfiguration configuration; + + public AtmosScheduledPollConsumer(AtmosEndpoint endpoint, Processor processor, AtmosConfiguration configuration) { + super(endpoint, processor); + this.endpoint = endpoint; + this.configuration = configuration; + } + + @Override + protected abstract int poll() throws Exception; + + /** + * Lifecycle method invoked when the consumer has created. + * Internally create or reuse a connection to the low level atmos client + * @throws Exception + */ + @Override + protected void doStart() throws Exception { + if (configuration.getClient() == null) { + //create atmos client + configuration.createClient(); + + LOG.info("consumer atmos client created"); + } + + super.doStart(); + } + + /** + * Lifecycle method invoked when the consumer has destroyed. + * Erase the reference to the atmos low level client + * @throws Exception + */ + @Override + protected void doStop() throws Exception { + if (configuration.getClient() == null) { + configuration.setClient(null); + + LOG.info("consumer atmos client deleted"); + } + super.doStop(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollGetConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollGetConsumer.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollGetConsumer.java new file mode 100644 index 0000000..a97f15e --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollGetConsumer.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.integration.consumer; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.atmos.AtmosConfiguration; +import org.apache.camel.component.atmos.AtmosEndpoint; +import org.apache.camel.component.atmos.core.AtmosAPIFacade; +import org.apache.camel.component.atmos.dto.AtmosResult; + +public class AtmosScheduledPollGetConsumer extends AtmosScheduledPollConsumer { + + public AtmosScheduledPollGetConsumer(AtmosEndpoint endpoint, Processor processor, AtmosConfiguration configuration) { + super(endpoint, processor, configuration); + } + + /** + * Poll from an atmos remote path and put the result in the message exchange + * @return number of messages polled + * @throws Exception + */ + @Override + protected int poll() throws Exception { + Exchange exchange = endpoint.createExchange(); + AtmosResult result = AtmosAPIFacade.getInstance(configuration.getClient()) + .get(configuration.getRemotePath()); + result.populateExchange(exchange); + LOG.info("consumer --> downloaded: " + result.toString()); + + try { + // send message to next processor in the route + getProcessor().process(exchange); + return 1; // number of messages polled + } finally { + // log exception if an exception occurred and was not handled + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosDelProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosDelProducer.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosDelProducer.java new file mode 100644 index 0000000..bf8c9b7 --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosDelProducer.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.integration.producer; + +import org.apache.camel.Exchange; +import org.apache.camel.component.atmos.AtmosConfiguration; +import org.apache.camel.component.atmos.AtmosEndpoint; +import org.apache.camel.component.atmos.core.AtmosAPIFacade; +import org.apache.camel.component.atmos.dto.AtmosResult; + +public class AtmosDelProducer extends AtmosProducer { + + public AtmosDelProducer(AtmosEndpoint endpoint, AtmosConfiguration configuration) { + super(endpoint, configuration); + } + + @Override + public void process(Exchange exchange) throws Exception { + AtmosResult result = AtmosAPIFacade.getInstance(configuration.getClient()) + .del(configuration.getRemotePath()); + result.populateExchange(exchange); + log.info("Deleted: " + configuration.getRemotePath()); + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosGetProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosGetProducer.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosGetProducer.java new file mode 100644 index 0000000..256ad97 --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosGetProducer.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.integration.producer; + +import org.apache.camel.Exchange; +import org.apache.camel.component.atmos.AtmosConfiguration; +import org.apache.camel.component.atmos.AtmosEndpoint; +import org.apache.camel.component.atmos.core.AtmosAPIFacade; +import org.apache.camel.component.atmos.dto.AtmosResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AtmosGetProducer extends AtmosProducer { + private static final transient Logger LOG = LoggerFactory.getLogger(AtmosGetProducer.class); + + public AtmosGetProducer(AtmosEndpoint endpoint, AtmosConfiguration configuration) { + super(endpoint, configuration); + } + + @Override + public void process(Exchange exchange) throws Exception { + AtmosResult result = AtmosAPIFacade.getInstance(configuration.getClient()) + .get(configuration.getRemotePath()); + result.populateExchange(exchange); + LOG.info("producer --> downloaded: " + result.toString()); + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosMoveProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosMoveProducer.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosMoveProducer.java new file mode 100644 index 0000000..a33536a --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosMoveProducer.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.integration.producer; + +import org.apache.camel.Exchange; +import org.apache.camel.component.atmos.AtmosConfiguration; +import org.apache.camel.component.atmos.AtmosEndpoint; +import org.apache.camel.component.atmos.core.AtmosAPIFacade; +import org.apache.camel.component.atmos.dto.AtmosResult; + +public class AtmosMoveProducer extends AtmosProducer { + + public AtmosMoveProducer(AtmosEndpoint endpoint, AtmosConfiguration configuration) { + super(endpoint, configuration); + } + + @Override + public void process(Exchange exchange) throws Exception { + AtmosResult result = AtmosAPIFacade.getInstance(configuration.getClient()) + .move(configuration.getRemotePath(), configuration.getNewRemotePath()); + result.populateExchange(exchange); + log.info("Moved from " + configuration.getRemotePath() + " to " + configuration.getNewRemotePath()); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosProducer.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosProducer.java new file mode 100644 index 0000000..afea01c --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosProducer.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.integration.producer; + +import org.apache.camel.component.atmos.AtmosConfiguration; +import org.apache.camel.component.atmos.AtmosEndpoint; +import org.apache.camel.impl.DefaultProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AtmosProducer extends DefaultProducer { + + private static final transient Logger LOG = LoggerFactory.getLogger(AtmosProducer.class); + + protected AtmosEndpoint endpoint; + protected AtmosConfiguration configuration; + + public AtmosProducer(AtmosEndpoint endpoint, AtmosConfiguration configuration) { + super(endpoint); + this.endpoint = endpoint; + this.configuration = configuration; + } + + @Override + protected void doStart() throws Exception { + if (configuration.getClient() == null) { + //create atmos client + configuration.createClient(); + + LOG.info("producer atmos client created"); + } + + super.doStart(); + } + + @Override + protected void doStop() throws Exception { + if (configuration.getClient() == null) { + configuration.setClient(null); + + LOG.info("producer atmos client deleted"); + } + super.doStop(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosPutProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosPutProducer.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosPutProducer.java new file mode 100644 index 0000000..c6cfa1f --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/producer/AtmosPutProducer.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.integration.producer; + +import org.apache.camel.Exchange; +import org.apache.camel.component.atmos.AtmosConfiguration; +import org.apache.camel.component.atmos.AtmosEndpoint; +import org.apache.camel.component.atmos.core.AtmosAPIFacade; +import org.apache.camel.component.atmos.dto.AtmosResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AtmosPutProducer extends AtmosProducer { + private static final transient Logger LOG = LoggerFactory.getLogger(AtmosPutProducer.class); + + public AtmosPutProducer(AtmosEndpoint endpoint, AtmosConfiguration configuration) { + super(endpoint, configuration); + } + + @Override + public void process(Exchange exchange) throws Exception { + AtmosResult result = AtmosAPIFacade.getInstance(configuration.getClient()) + .put(configuration.getLocalPath(), configuration.getRemotePath()); + result.populateExchange(exchange); + LOG.info("Uploaded: " + result.toString()); + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosConstants.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosConstants.java new file mode 100644 index 0000000..3049e67 --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosConstants.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.util; + +public final class AtmosConstants { + + public static final String ATMOS_FILE_SEPARATOR = "/"; + public static final long POLL_CONSUMER_DELAY = 60 * 60 * 1000L; + + private AtmosConstants() { } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosException.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosException.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosException.java new file mode 100644 index 0000000..1740887 --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.util; + +public class AtmosException extends Exception { + + private static final long serialVersionUID = 1L; + + public AtmosException(String message) { + super(message); + } + + public AtmosException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosOperation.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosOperation.java new file mode 100644 index 0000000..2764c18 --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosOperation.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.util; + +public enum AtmosOperation { + put("put"), + del("del"), + search("search"), + get("get"), + move("move"); + + private final String text; + + private AtmosOperation(final String text) { + this.text = text; + } + + @Override + public String toString() { + return text; + } + +} + http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosPropertyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosPropertyManager.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosPropertyManager.java new file mode 100644 index 0000000..cc8ceec --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosPropertyManager.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.util; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.Properties; + +public final class AtmosPropertyManager { + + private static Properties properties; + private static AtmosPropertyManager instance; + + private AtmosPropertyManager() { } + + public static synchronized AtmosPropertyManager getInstance() throws Exception { + if (instance == null) { + instance = new AtmosPropertyManager(); + properties = loadProperties(); + } + return instance; + } + + public String getProperty(String key) { + return properties.getProperty(key); + } + + + private static Properties loadProperties() throws Exception { + URL url = AtmosPropertyManager.class.getResource("/atmos.properties"); + InputStream inStream; + try { + inStream = url.openStream(); + } catch (IOException e) { + e.printStackTrace(); + throw new AtmosException("atmos.properties could not be found"); + } + properties = new Properties(); + try { + properties.load(inStream); + } catch (IOException e) { + e.printStackTrace(); + throw new AtmosException("atmos.properties can't be read"); + } + return properties; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosResultCode.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosResultCode.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosResultCode.java new file mode 100644 index 0000000..db860ce --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosResultCode.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.util; + +public enum AtmosResultCode { + OK, KO; +} + http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosResultHeader.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosResultHeader.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosResultHeader.java new file mode 100644 index 0000000..c119401 --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/util/AtmosResultHeader.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.atmos.util; + +public enum AtmosResultHeader { + DOWNLOADED_FILE, DOWNLOADED_FILES, UPLOADED_FILE, UPLOADED_FILES, FOUND_FILES , DELETED_PATH, MOVED_PATH; +} + http://git-wip-us.apache.org/repos/asf/camel/blob/7d292021/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/validator/AtmosConfigurationValidator.java ---------------------------------------------------------------------- diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/validator/AtmosConfigurationValidator.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/validator/AtmosConfigurationValidator.java new file mode 100644 index 0000000..ccb2fcf --- /dev/null +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/validator/AtmosConfigurationValidator.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.camel.component.atmos.validator; + +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.camel.component.atmos.AtmosConfiguration; +import org.apache.camel.component.atmos.util.AtmosException; +import org.apache.camel.component.atmos.util.AtmosOperation; +import static org.apache.camel.component.atmos.util.AtmosConstants.ATMOS_FILE_SEPARATOR; + +public final class AtmosConfigurationValidator { + + private AtmosConfigurationValidator() { + } + + /** + * Validate the parameters passed in the incoming url. + * + * @param configuration object containing the parameters. + * @throws AtmosException + */ + public static void validate(AtmosConfiguration configuration) throws AtmosException { + validateCommonProperties(configuration); + AtmosOperation op = configuration.getOperation(); + if (op == AtmosOperation.get) { + validateGetOp(configuration); + } else if (op == AtmosOperation.put) { + validatePutOp(configuration); + } else if (op == AtmosOperation.del) { + validateDelOp(configuration); + } else if (op == AtmosOperation.move) { + validateMoveOp(configuration); + } + } + + private static void validateCommonProperties(AtmosConfiguration configuration) throws AtmosException { + if (configuration.getFullTokenId() == null || configuration.getFullTokenId().equals("")) { + throw new AtmosException("option <fullTokenId> is not present or not valid!"); + } + if (configuration.getSecretKey() == null || configuration.getSecretKey().equals("")) { + throw new AtmosException("option <secretKey> is not present or not valid!"); + } + if (configuration.getUri() == null || configuration.getUri().equals("")) { + throw new AtmosException("option <uri> is not present!"); + } else { + try { + URI uri = new URI(configuration.getUri()); + } catch (URISyntaxException use) { + throw new AtmosException("option <uri> is not valid!", use); + } + } + } + + private static void validateGetOp(AtmosConfiguration configuration) throws AtmosException { + validateRemotePath(configuration.getRemotePath()); + } + + private static void validatePutOp(AtmosConfiguration configuration) throws AtmosException { + validateLocalPath(configuration.getLocalPath()); + //remote path is optional + if (configuration.getRemotePath() != null) { + validateRemotePathForPut(configuration.getRemotePath()); + } else { //in case remote path is not set, local path is even the remote path so it must be validated as UNIX + validatePathInUnix(configuration.getLocalPath()); + } + } + + private static void validateDelOp(AtmosConfiguration configuration) throws AtmosException { + validateRemotePath(configuration.getRemotePath()); + } + + private static void validateMoveOp(AtmosConfiguration configuration) throws AtmosException { + validateRemotePath(configuration.getRemotePath()); + validateRemotePath(configuration.getNewRemotePath()); + } + + private static void validateLocalPath(String localPath) throws AtmosException { + if (localPath == null || localPath.equals("")) { + throw new AtmosException("option <localPath> is not present or not valid!"); + } + File file = new File(localPath); + if (!file.exists()) { + throw new AtmosException("option <localPath> is not an existing file or directory!"); + } + } + + private static void validateRemotePath(String remotePath) throws AtmosException { + if (remotePath == null || !remotePath.startsWith(ATMOS_FILE_SEPARATOR)) { + throw new AtmosException("option <remotePath> is not valid!"); + } + validatePathInUnix(remotePath); + } + + private static void validateRemotePathForPut(String remotePath) throws AtmosException { + if (!remotePath.startsWith(ATMOS_FILE_SEPARATOR)) { + throw new AtmosException("option <remotePath> is not valid!"); + } + validatePathInUnix(remotePath); + } + + private static void validatePathInUnix(String path) throws AtmosException { + Pattern pattern = Pattern.compile("/*?(\\S+)/*?", Pattern.CASE_INSENSITIVE); + Matcher matcher = pattern.matcher(path); + if (!matcher.matches()) { + throw new AtmosException(path + " is not a valid path, must be in UNIX form!"); + } + } + +}