omarsmak commented on a change in pull request #4697: URL: https://github.com/apache/camel/pull/4697#discussion_r545811298
########## File path: components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeEndpoint.java ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.azure.storage.datalake; + +import com.azure.storage.file.datalake.DataLakeServiceClient; +import org.apache.camel.Category; +import org.apache.camel.Component; +import org.apache.camel.Consumer; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.azure.storage.datalake.client.DataLakeClientFactory; +import org.apache.camel.component.azure.storage.datalake.operations.DataLakeOperationResponse; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.support.DefaultEndpoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@UriEndpoint(firstVersion = "3.7.0", scheme = "azure-storage-datalake", title = "Azure storage datalake service", + syntax = "azure-storage-datalake:accounName/fileSystemName", category = { Category.CLOUD, Category.FILE }) Review comment: typo `accountName` ########## File path: components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeEndpoint.java ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.azure.storage.datalake; + +import com.azure.storage.file.datalake.DataLakeServiceClient; +import org.apache.camel.Category; +import org.apache.camel.Component; +import org.apache.camel.Consumer; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.azure.storage.datalake.client.DataLakeClientFactory; +import org.apache.camel.component.azure.storage.datalake.operations.DataLakeOperationResponse; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.support.DefaultEndpoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@UriEndpoint(firstVersion = "3.7.0", scheme = "azure-storage-datalake", title = "Azure storage datalake service", Review comment: is `3.8.0`. ########## File path: components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConstants.java ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.azure.storage.datalake; + +public final class DataLakeConstants { + private static final String HEADER_PREFIX = "CamelAzureStorageDataLake"; + + public static final String LIST_FILESYSTEMS_OPTIONS = HEADER_PREFIX + "ListFileSystemsOptions"; + public static final String TIMEOUT = HEADER_PREFIX + "Timeout"; + public static final String DATALAKE_OPERATION = HEADER_PREFIX + "Operation"; + public static final String FILESYSTEM_NAME = HEADER_PREFIX + "FileSystemName"; + public static final String DIRECTORY_NAME = HEADER_PREFIX + "DirectoryName"; + public static final String FILE_NAME = HEADER_PREFIX + "FileName"; + public static final String METADATA = HEADER_PREFIX + "Metadata"; + public static final String PUBLIC_ACCESS_TYPE = HEADER_PREFIX + "PublicAccessType"; + public static final String RAW_HTTP_HEADERS = HEADER_PREFIX + "RawHttpHeaders"; + public static final String DATALAKE_REQUEST_CONDITION = HEADER_PREFIX + "RequestCondition"; + public static final String LIST_PATH_OPTIONS = HEADER_PREFIX + "ListPathOptions"; Review comment: This constant seems to be used, can you please check? ########## File path: catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/azure-storage-datalake-component.adoc ########## @@ -0,0 +1,119 @@ +[[azure-storage-datalake-component]] Review comment: The documentation is laking a lot of info, please add info about the component, examples, usage, info about the headers that users can use in the producer and expect in the consumer, any features that you implemented and you want users to try out ... etc. Ex: https://camel.apache.org/components/latest/azure-storage-blob-component.html ########## File path: test-infra/camel-test-infra-azure-storage-datalake/src/test/java/org/apache/camel/test/infra/azure/storage/datalake/clients/AzureStorageDataLakeClientUtils.java ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.test.infra.azure.storage.datalake.clients; + +import com.azure.storage.common.StorageSharedKeyCredential; +import com.azure.storage.file.datalake.DataLakeServiceClient; +import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; +import org.apache.camel.test.infra.azure.common.AzureConfigs; + +public final class AzureStorageDataLakeClientUtils { + + private AzureStorageDataLakeClientUtils() { + + } + + public static DataLakeServiceClient getClient() { + String instanceType = System.getProperty("azure.instance.type"); + + String accountName = System.getProperty(AzureConfigs.ACCOUNT_NAME); + String accountKey = System.getProperty(AzureConfigs.ACCOUNT_KEY); + StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey); + + String host = System.getProperty(AzureConfigs.HOST); Review comment: I see `HOST` and `PORT` is not used. Can you remove these since Azurite is not yet supporting `Datalake` ########## File path: components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeEndpoint.java ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.azure.storage.datalake; + +import com.azure.storage.file.datalake.DataLakeServiceClient; +import org.apache.camel.Category; +import org.apache.camel.Component; +import org.apache.camel.Consumer; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.azure.storage.datalake.client.DataLakeClientFactory; +import org.apache.camel.component.azure.storage.datalake.operations.DataLakeOperationResponse; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.support.DefaultEndpoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@UriEndpoint(firstVersion = "3.7.0", scheme = "azure-storage-datalake", title = "Azure storage datalake service", + syntax = "azure-storage-datalake:accounName/fileSystemName", category = { Category.CLOUD, Category.FILE }) +public class DataLakeEndpoint extends DefaultEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(DataLakeEndpoint.class); + + @UriParam(description = "service client of datalake") + private DataLakeServiceClient dataLakeServiceClient; + + @UriParam(description = "configuration object of azure datalake") + private DataLakeConfiguration configuration; + + public DataLakeEndpoint() { + } + + public DataLakeEndpoint(final String uri, final Component component, final DataLakeConfiguration configuration) { + super(uri, component); + this.configuration = configuration; + } + + @Override + public Producer createProducer() throws Exception { + return new DataLakeProducer(this); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new DataLakeConsumer(this, processor); + } + + public DataLakeConfiguration getConfiguration() { + return configuration; + } + + public void setConfiguration(DataLakeConfiguration configuration) { + this.configuration = configuration; + } + + public DataLakeServiceClient getDataLakeServiceClient() { + return dataLakeServiceClient; + } + + public void setDataLakeServiceClient(DataLakeServiceClient dataLakeServiceClient) { + this.dataLakeServiceClient = dataLakeServiceClient; + } + + public void setResponseOnExchange(final DataLakeOperationResponse response, final Exchange exchange) { + final Message message = exchange.getIn(); + message.setBody(response.getBody()); + message.setHeaders(response.getHeaders()); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + if (configuration.getServiceClient() != null) { Review comment: if the client can be set on the endpoint, can you add an extra check if endpoint it set on the endpoint as well ########## File path: components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.azure.storage.datalake; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import com.azure.storage.file.datalake.DataLakeFileSystemClient; +import com.azure.storage.file.datalake.models.DataLakeStorageException; +import com.azure.storage.file.datalake.models.PathItem; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExtendedExchange; +import org.apache.camel.Processor; +import org.apache.camel.component.azure.storage.datalake.client.DataLakeFileClientWrapper; +import org.apache.camel.component.azure.storage.datalake.client.DataLakeFileSystemClientWrapper; +import org.apache.camel.component.azure.storage.datalake.operations.DataLakeFileOperations; +import org.apache.camel.component.azure.storage.datalake.operations.DataLakeFileSystemOperations; +import org.apache.camel.component.azure.storage.datalake.operations.DataLakeOperationResponse; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.support.ScheduledBatchPollingConsumer; +import org.apache.camel.util.CastUtils; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class DataLakeConsumer extends ScheduledBatchPollingConsumer { + + public static final int NOT_FOUND = 404; + private static final Logger LOG = LoggerFactory.getLogger(DataLakeConsumer.class); + + public DataLakeConsumer(Endpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + @Override + protected int poll() throws Exception { + final String fileSystemName = getEndpoint().getConfiguration().getFileSystemName(); + final String fileName = getEndpoint().getConfiguration().getFileName(); + final String directoryName = getEndpoint().getConfiguration().getDirectoryName(); Review comment: `directoryName` is not used here, any purpose? ########## File path: components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeProducer.java ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.azure.storage.datalake; + +import java.io.IOException; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.component.azure.storage.datalake.client.DataLakeDirectoryClientWrapper; +import org.apache.camel.component.azure.storage.datalake.client.DataLakeFileClientWrapper; +import org.apache.camel.component.azure.storage.datalake.client.DataLakeFileSystemClientWrapper; +import org.apache.camel.component.azure.storage.datalake.client.DataLakeServiceClientWrapper; +import org.apache.camel.component.azure.storage.datalake.operations.DataLakeDirectoryOperations; +import org.apache.camel.component.azure.storage.datalake.operations.DataLakeFileOperations; +import org.apache.camel.component.azure.storage.datalake.operations.DataLakeFileSystemOperations; +import org.apache.camel.component.azure.storage.datalake.operations.DataLakeOperationResponse; +import org.apache.camel.component.azure.storage.datalake.operations.DataLakeServiceOperations; +import org.apache.camel.support.DefaultProducer; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DataLakeProducer extends DefaultProducer { + + private static final Logger LOG = LoggerFactory.getLogger(DataLakeProducer.class); + private final DataLakeConfiguration configuration; + private final DataLakeConfigurationOptionsProxy configurationProxy; + private final DataLakeServiceClientWrapper dataLakeServiceClientWrapper; + + public DataLakeProducer(final Endpoint endpoint) { + super(endpoint); + // this.configuration = getEndpoint().getConfiguration(); + configuration = getEndpoint().getConfiguration(); + configurationProxy = new DataLakeConfigurationOptionsProxy(configuration); + dataLakeServiceClientWrapper = new DataLakeServiceClientWrapper(getEndpoint().getDataLakeServiceClient()); + } + + @Override + public DataLakeEndpoint getEndpoint() { + return (DataLakeEndpoint) super.getEndpoint(); + } + + @Override + public void process(Exchange exchange) throws IllegalArgumentException, IOException { + DataLakeOperationsDefinition operation = determineOperation(exchange); + switch (operation) { + case listFileSystem: + setResponse(exchange, getServiceOperations(exchange).listFileSystems(exchange)); + break; + case createFileSystem: + setResponse(exchange, getFileSystemOperations(exchange).createFileSystem(exchange)); + break; + case deleteFileSystem: + setResponse(exchange, getFileSystemOperations(exchange).deleteFileSystem(exchange)); + break; + case listPaths: + setResponse(exchange, getFileSystemOperations(exchange).listPaths(exchange)); + break; + case getFile: + setResponse(exchange, getFileOperations(exchange).getFile(exchange)); + break; + case downloadToFile: + setResponse(exchange, getFileOperations(exchange).downloadToFile(exchange)); + break; + case downloadLink: + setResponse(exchange, getFileOperations(exchange).downloadLink(exchange)); + break; + case deleteFile: + setResponse(exchange, getFileOperations(exchange).deleteFile(exchange)); + break; + case appendToFile: + setResponse(exchange, getFileOperations(exchange).appendToFile(exchange)); + break; + case flushToFile: + setResponse(exchange, getFileOperations(exchange).flushToFile(exchange)); + break; + case uploadFromFile: + setResponse(exchange, getFileOperations(exchange).uploadFromFile(exchange)); + break; + case openQueryInputStream: + setResponse(exchange, getFileOperations(exchange).openQueryInputStream(exchange)); + break; + case upload: + setResponse(exchange, getFileOperations(exchange).upload(exchange)); + break; + default: + throw new IllegalArgumentException("Unsupported operation"); + } + } + + private void setResponse(final Exchange exchange, final DataLakeOperationResponse dataLakeOperationResponse) { + exchange.getMessage().setBody(dataLakeOperationResponse.getBody()); + exchange.getMessage().setHeaders(dataLakeOperationResponse.getHeaders()); + } + + private DataLakeOperationsDefinition determineOperation(final Exchange exchange) { + return configurationProxy.getOperation(exchange); + } + + private DataLakeServiceOperations getServiceOperations(final Exchange exchange) { + return new DataLakeServiceOperations(configuration, dataLakeServiceClientWrapper); + } + + private DataLakeFileSystemOperations getFileSystemOperations(final Exchange exchange) { + LOG.info("getting file system operation object"); + final DataLakeFileSystemClientWrapper clientWrapper + = dataLakeServiceClientWrapper.getDataLakeFileSystemClientWrapper(determineFileSystemName(exchange)); + + return new DataLakeFileSystemOperations(configuration, clientWrapper); + } + + private DataLakeDirectoryOperations getDirectoryOperations(final Exchange exchange) { Review comment: I see this operation is not used ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org