This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new 00d90e3f4a Revert "NIFI-12231 FetchSmb supports Move and Delete Completion Strategies" 00d90e3f4a is described below commit 00d90e3f4a71eac304efaa30faf6137c8e507798 Author: Joseph Witt <joew...@apache.org> AuthorDate: Mon May 6 11:37:56 2024 -0700 Revert "NIFI-12231 FetchSmb supports Move and Delete Completion Strategies" This reverts commit d3a7549e8109774b06e79f13d570ef5da04c451f. --- .../processors/smb/util/CompletionStrategy.java | 49 ----- .../apache/nifi/services/smb/SmbClientService.java | 10 +- .../org/apache/nifi/processors/smb/FetchSmb.java | 152 +++++---------- .../org/apache/nifi/processors/smb/ListSmb.java | 2 +- .../org/apache/nifi/processors/smb/FetchSmbIT.java | 212 ++------------------- .../apache/nifi/processors/smb/ListSmbTest.java | 4 +- .../nifi/processors/smb/SambaTestContainers.java | 59 ++---- .../nifi/services/smb/SmbjClientService.java | 113 ++++------- .../nifi/services/smb/SmbjClientServiceIT.java | 2 +- .../nifi/services/smb/SmbjClientServiceTest.java | 2 +- 10 files changed, 123 insertions(+), 482 deletions(-) diff --git a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/CompletionStrategy.java b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/CompletionStrategy.java deleted file mode 100644 index cf3dd0fcbf..0000000000 --- a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/CompletionStrategy.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.smb.util; - -import org.apache.nifi.components.DescribedValue; - -public enum CompletionStrategy implements DescribedValue { - - NONE("None", "Leaves the file as-is."), - MOVE("Move File", "Moves the file to the specified directory on the remote system. This option cannot be used when DFS is enabled on 'SMB Client Provider Service'."), - DELETE("Delete File", "Deletes the file from the remote system."); - - private final String displayName; - private final String description; - - CompletionStrategy(String displayName, String description) { - this.displayName = displayName; - this.description = description; - } - - @Override - public String getValue() { - return name(); - } - - @Override - public String getDisplayName() { - return displayName; - } - - @Override - public String getDescription() { - return description; - } -} diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java index c70dea133e..9eb0e41a67 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java @@ -25,13 +25,9 @@ import java.util.stream.Stream; */ public interface SmbClientService extends AutoCloseable { - Stream<SmbListableEntity> listFiles(String directoryPath); + Stream<SmbListableEntity> listRemoteFiles(String path); - void ensureDirectory(String directoryPath); + void createDirectory(String path); - void readFile(String filePath, OutputStream outputStream) throws IOException; - - void moveFile(String filePath, String directoryPath); - - void deleteFile(String filePath); + void readFile(String fileName, OutputStream outputStream) throws IOException; } diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java index b5f58b5890..c38216058d 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java @@ -16,6 +16,16 @@ */ package org.apache.nifi.processors.smb; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableSet; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; @@ -23,29 +33,17 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.smb.util.CompletionStrategy; import org.apache.nifi.services.smb.SmbClientProviderService; import org.apache.nifi.services.smb.SmbClientService; import org.apache.nifi.services.smb.SmbException; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; - -import static java.util.Arrays.asList; -import static java.util.Collections.unmodifiableSet; -import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; -import static org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR; - @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @Tags({"samba", "smb", "cifs", "files", "fetch"}) @CapabilityDescription("Fetches files from a SMB Share. Designed to be used in tandem with ListSmb.") @@ -59,8 +57,8 @@ public class FetchSmb extends AbstractProcessor { public static final String ERROR_CODE_ATTRIBUTE = "error.code"; public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message"; - public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor.Builder() - .name("remote-file") + public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor + .Builder().name("remote-file") .displayName("Remote File") .description("The full path of the file to be retrieved from the remote server. Expression language is supported.") .required(true) @@ -69,139 +67,91 @@ public class FetchSmb extends AbstractProcessor { .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR) .build(); - public static final PropertyDescriptor COMPLETION_STRATEGY = new PropertyDescriptor.Builder() - .name("Completion Strategy") - .description("Specifies what to do with the original file on the server once it has been processed. If the Completion Strategy fails, a warning will be " - + "logged but the data will still be transferred.") - .allowableValues(CompletionStrategy.class) - .defaultValue(CompletionStrategy.NONE) - .required(true) - .build(); - - public static final PropertyDescriptor DESTINATION_DIRECTORY = new PropertyDescriptor.Builder() - .name("Destination Directory") - .description("The directory on the remote server to move the original file to once it has been processed.") - .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(true) - .dependsOn(COMPLETION_STRATEGY, CompletionStrategy.MOVE) - .build(); - - public static final PropertyDescriptor CREATE_DESTINATION_DIRECTORY = new PropertyDescriptor.Builder() - .name("Create Destination Directory") - .description("Specifies whether or not the remote directory should be created if it does not exist.") - .required(true) - .allowableValues("true", "false") - .defaultValue("false") - .dependsOn(COMPLETION_STRATEGY, CompletionStrategy.MOVE) - .build(); - - public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new PropertyDescriptor.Builder() + public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder() .name("smb-client-provider-service") .displayName("SMB Client Provider Service") .description("Specifies the SMB client provider to use for creating SMB connections.") .required(true) .identifiesControllerService(SmbClientProviderService.class) .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") - .description("A FlowFile will be routed here for each successfully fetched file.") + .description("A flowfile will be routed here for each successfully fetched file.") .build(); - public static final Relationship REL_FAILURE = - new Relationship.Builder() - .name("failure") - .description("A FlowFile will be routed here when failed to fetch its content.") + new Relationship.Builder().name("failure") + .description( + "A flowfile will be routed here when failed to fetch its content.") .build(); - public static final Set<Relationship> RELATIONSHIPS = unmodifiableSet(new HashSet<>(asList( REL_SUCCESS, REL_FAILURE ))); - + public static final String UNCATEGORIZED_ERROR = "-2"; private static final List<PropertyDescriptor> PROPERTIES = asList( SMB_CLIENT_PROVIDER_SERVICE, - REMOTE_FILE, - COMPLETION_STRATEGY, - DESTINATION_DIRECTORY, - CREATE_DESTINATION_DIRECTORY + REMOTE_FILE ); - public static final String UNCATEGORIZED_ERROR = "-2"; - @Override public Set<Relationship> getRelationships() { return RELATIONSHIPS; } @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return PROPERTIES; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); if (flowFile == null) { return; } - final Map<String, String> attributes = flowFile.getAttributes(); - final String filePath = context.getProperty(REMOTE_FILE).evaluateAttributeExpressions(attributes).getValue(); - - final SmbClientProviderService clientProviderService = context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class); + final SmbClientProviderService clientProviderService = + context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class); try (SmbClientService client = clientProviderService.getClient()) { - flowFile = session.write(flowFile, outputStream -> client.readFile(filePath, outputStream)); - - session.transfer(flowFile, REL_SUCCESS); + fetchAndTransfer(session, context, client, flowFile); } catch (Exception e) { - getLogger().error("Could not fetch file {}.", filePath, e); + getLogger().error("Couldn't connect to SMB.", e); flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e)); flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage()); session.transfer(flowFile, REL_FAILURE); - return; } - session.commitAsync(() -> performCompletionStrategy(context, attributes)); } - private String getErrorCode(final Exception exception) { + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + private void fetchAndTransfer(ProcessSession session, ProcessContext context, SmbClientService client, + FlowFile flowFile) { + final Map<String, String> attributes = flowFile.getAttributes(); + final String filename = context.getProperty(REMOTE_FILE) + .evaluateAttributeExpressions(attributes).getValue(); + try { + flowFile = session.write(flowFile, outputStream -> client.readFile(filename, outputStream)); + session.transfer(flowFile, REL_SUCCESS); + } catch (Exception e) { + getLogger().error("Couldn't fetch file {}.", filename, e); + flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE, getErrorCode(e)); + flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, getErrorMessage(e)); + session.transfer(flowFile, REL_FAILURE); + } + } + + private String getErrorCode(Exception exception) { return Optional.ofNullable(exception instanceof SmbException ? (SmbException) exception : null) .map(SmbException::getErrorCode) .map(String::valueOf) .orElse(UNCATEGORIZED_ERROR); } - private void performCompletionStrategy(final ProcessContext context, final Map<String, String> attributes) { - final CompletionStrategy completionStrategy = context.getProperty(COMPLETION_STRATEGY).asAllowableValue(CompletionStrategy.class); - - if (completionStrategy == CompletionStrategy.NONE) { - return; - } - - final String filePath = context.getProperty(REMOTE_FILE).evaluateAttributeExpressions(attributes).getValue(); - - final SmbClientProviderService clientProviderService = context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class); - - try (SmbClientService client = clientProviderService.getClient()) { - if (completionStrategy == CompletionStrategy.MOVE) { - final String destinationDirectory = context.getProperty(DESTINATION_DIRECTORY).evaluateAttributeExpressions(attributes).getValue(); - final boolean createDestinationDirectory = context.getProperty(CREATE_DESTINATION_DIRECTORY).asBoolean(); - - if (createDestinationDirectory) { - client.ensureDirectory(destinationDirectory); - } - - client.moveFile(filePath, destinationDirectory); - } else if (completionStrategy == CompletionStrategy.DELETE) { - client.deleteFile(filePath); - } - } catch (Exception e) { - getLogger().warn("Could not perform completion strategy {} for file {}", completionStrategy, filePath, e); - } + private String getErrorMessage(Exception exception) { + return Optional.ofNullable(exception.getMessage()) + .orElse(exception.getClass().getSimpleName()); } } + diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java index 091a5b176e..886f1bee46 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java @@ -345,7 +345,7 @@ public class ListSmb extends AbstractListProcessor<SmbListableEntity> { context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class); final String directory = getDirectory(context); final SmbClientService clientService = clientProviderService.getClient(); - return clientService.listFiles(directory).onClose(() -> { + return clientService.listRemoteFiles(directory).onClose(() -> { try { clientService.close(); } catch (Exception e) { diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java index a1ad08db0b..fb057791a8 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java @@ -16,226 +16,52 @@ */ package org.apache.nifi.processors.smb; -import static org.apache.nifi.processors.smb.FetchSmb.COMPLETION_STRATEGY; -import static org.apache.nifi.processors.smb.FetchSmb.CREATE_DESTINATION_DIRECTORY; -import static org.apache.nifi.processors.smb.FetchSmb.DESTINATION_DIRECTORY; import static org.apache.nifi.processors.smb.FetchSmb.REL_FAILURE; import static org.apache.nifi.processors.smb.FetchSmb.REL_SUCCESS; import static org.apache.nifi.processors.smb.FetchSmb.REMOTE_FILE; import static org.apache.nifi.util.TestRunners.newTestRunner; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import java.util.Collections; import java.util.HashMap; import java.util.Map; - -import org.apache.nifi.processors.smb.util.CompletionStrategy; import org.apache.nifi.services.smb.SmbjClientProviderService; import org.apache.nifi.util.TestRunner; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -class FetchSmbIT extends SambaTestContainers { - - private static final String TEST_CONTENT = "test_content"; - - private TestRunner testRunner; - - private SmbjClientProviderService smbjClientProviderService; - - @BeforeEach - void setUpComponents() throws Exception { - testRunner = newTestRunner(FetchSmb.class); - - smbjClientProviderService = configureSmbClient(testRunner, true); - } - - @AfterEach - void tearDownComponents() { - testRunner.disableControllerService(smbjClientProviderService); - } +public class FetchSmbIT extends SambaTestContainers { @Test - void fetchFilesUsingEL() { - writeFile("test_file", TEST_CONTENT); - + public void fetchFilesUsingEL() throws Exception { + writeFile("/test_file", "test_content"); + TestRunner testRunner = newTestRunner(FetchSmb.class); testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}"); + final SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true); - final Map<String, String> attributes = new HashMap<>(); + Map<String, String> attributes = new HashMap<>(); attributes.put("attribute_to_find_using_EL", "test_file"); - runProcessor(attributes); - - assertSuccessFlowFile(); + testRunner.enqueue("ignored", attributes); + testRunner.run(); + testRunner.assertTransferCount(REL_SUCCESS, 1); + assertEquals("test_content", testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent()); + testRunner.assertValid(); + testRunner.disableControllerService(smbjClientProviderService); } @Test - void tryToFetchNonExistingFileEmitsFailure() throws Exception { + public void tryToFetchNonExistingFileEmitsFailure() throws Exception { + TestRunner testRunner = newTestRunner(FetchSmb.class); testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}"); + final SmbjClientProviderService smbjClientProviderService = configureSmbClient(testRunner, true); - final Map<String, String> attributes = new HashMap<>(); + Map<String, String> attributes = new HashMap<>(); attributes.put("attribute_to_find_using_EL", "non_existing_file"); - runProcessor(attributes); - - testRunner.assertTransferCount(REL_FAILURE, 1); - } - - @Test - void testCompletionStrategyNone() { - final String baseDir = "dir_none"; - final String filename = "test_file"; - final String filePath = baseDir + "/" + filename; - - createDirectory(baseDir, AccessMode.READ_ONLY); - writeFile(filePath, TEST_CONTENT, AccessMode.READ_ONLY); - - testRunner.setProperty(REMOTE_FILE, filePath); - testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.NONE); - - runProcessor(); - - assertSuccessFlowFile(); - assertNoWarning(); - - assertTrue(fileExists(filePath)); - } - - @Test - void testCompletionStrategyDelete() { - final String baseDir = "dir_delete"; - final String filename = "test_file"; - final String filePath = baseDir + "/" + filename; - - createDirectory(baseDir, AccessMode.READ_WRITE); - writeFile(filePath, TEST_CONTENT, AccessMode.READ_WRITE); - - testRunner.setProperty(REMOTE_FILE, filePath); - testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.DELETE); - - runProcessor(); - - assertSuccessFlowFile(); - assertNoWarning(); - - assertFalse(fileExists(filePath)); - } - - @Test - void testCompletionStrategyMoveWithExistingDirectory() { - final String baseDir = "dir_move_existing"; - final String filename = "test_file"; - final String filePath = baseDir + "/" + filename; - final String processedDir = "processed"; - - createDirectory(baseDir, AccessMode.READ_WRITE); - writeFile(filePath, TEST_CONTENT, AccessMode.READ_WRITE); - createDirectory(processedDir, AccessMode.READ_WRITE); - - testRunner.setProperty(REMOTE_FILE, filePath); - testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.MOVE); - testRunner.setProperty(DESTINATION_DIRECTORY, processedDir); - - runProcessor(); - - assertSuccessFlowFile(); - assertNoWarning(); - - assertFalse(fileExists(filePath)); - assertTrue(fileExists(processedDir + "/" + filename)); - } - - @Test - void testCompletionStrategyMoveWithCreatingDirectory() { - final String baseDir = "dir_move_creating"; - final String filename = "test_file"; - final String filePath = baseDir + "/" + filename; - final String processedDir = "processed"; - - createDirectory(baseDir, AccessMode.READ_WRITE); - writeFile(filePath, TEST_CONTENT, AccessMode.READ_WRITE); - - testRunner.setProperty(REMOTE_FILE, filePath); - testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.MOVE); - testRunner.setProperty(DESTINATION_DIRECTORY, processedDir); - testRunner.setProperty(CREATE_DESTINATION_DIRECTORY, "true"); - - runProcessor(); - - assertSuccessFlowFile(); - assertNoWarning(); - - assertFalse(fileExists(filePath)); - assertTrue(fileExists(processedDir + "/" + filename)); - } - - @Test - void testCompletionStrategyDeleteFailsWhenNoPermission() { - final String baseDir = "dir_delete_noperm"; - final String filename = "test_file"; - final String filePath = baseDir + "/" + filename; - - createDirectory(baseDir, AccessMode.READ_ONLY); - writeFile(filePath, TEST_CONTENT, AccessMode.READ_ONLY); - - testRunner.setProperty(REMOTE_FILE, filePath); - testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.DELETE); - - runProcessor(); - - assertSuccessFlowFile(); - assertWarning(); - - assertTrue(fileExists(filePath)); - } - - @Test - void testCompletionStrategyMoveFailsWhenNoPermission() { - final String baseDir = "dir_move_noperm"; - final String filename = "test_file"; - final String filePath = baseDir + "/" + filename; - final String processedDir = "processed"; - - createDirectory(baseDir, AccessMode.READ_ONLY); - writeFile(filePath, TEST_CONTENT, AccessMode.READ_ONLY); - createDirectory(processedDir, AccessMode.READ_ONLY); - - testRunner.setProperty(REMOTE_FILE, filePath); - testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.MOVE); - testRunner.setProperty(DESTINATION_DIRECTORY, processedDir); - - runProcessor(); - - assertSuccessFlowFile(); - assertWarning(); - - assertTrue(fileExists(filePath)); - assertFalse(fileExists(processedDir + "/" + filename)); - } - - private void runProcessor() { - runProcessor(Collections.emptyMap()); - } - - private void runProcessor(final Map<String, String> attributes) { testRunner.enqueue("ignored", attributes); testRunner.run(); - } - - private void assertSuccessFlowFile() { - testRunner.assertTransferCount(REL_SUCCESS, 1); - assertEquals(TEST_CONTENT, testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent()); - } - - private void assertWarning() { - assertFalse(testRunner.getLogger().getWarnMessages().isEmpty()); - } - - private void assertNoWarning() { - assertTrue(testRunner.getLogger().getWarnMessages().isEmpty()); + testRunner.assertTransferCount(REL_FAILURE, 1); + testRunner.assertValid(); + testRunner.disableControllerService(smbjClientProviderService); } } diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java index 621ec74209..0594ef3e2f 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java @@ -232,7 +232,7 @@ class ListSmbTest { testRunner.setProperty(LISTING_STRATEGY, "timestamps"); testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis"); final SmbClientService mockNifiSmbClientService = configureTestRunnerWithMockedSambaClient(testRunner); - when(mockNifiSmbClientService.listFiles(anyString())).thenThrow(new RuntimeException("test exception")); + when(mockNifiSmbClientService.listRemoteFiles(anyString())).thenThrow(new RuntimeException("test exception")); testRunner.run(); assertEquals(1, testRunner.getLogger().getErrorMessages().size()); testRunner.assertValid(); @@ -282,7 +282,7 @@ class ListSmbTest { } private void mockSmbFolders(SmbClientService mockNifiSmbClientService, SmbListableEntity... entities) { - doAnswer(ignore -> stream(entities)).when(mockNifiSmbClientService).listFiles(anyString()); + doAnswer(ignore -> stream(entities)).when(mockNifiSmbClientService).listRemoteFiles(anyString()); } private SmbListableEntity listableEntity(String name, long timeStamp) { diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java index 51c283b85d..f38d36d171 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java @@ -39,19 +39,12 @@ import org.testcontainers.utility.DockerImageName; public class SambaTestContainers { - protected final static Logger LOGGER = LoggerFactory.getLogger(SambaTestContainers.class); - protected final static Integer DEFAULT_SAMBA_PORT = 445; - - protected enum AccessMode { - READ_ONLY,READ_WRITE; - } - + protected final static Logger logger = LoggerFactory.getLogger(SambaTestContainers.class); protected final GenericContainer<?> sambaContainer = new GenericContainer<>(DockerImageName.parse("dperson/samba")) - .withCreateContainerCmdModifier(cmd -> cmd.withName("samba-test")) .withExposedPorts(DEFAULT_SAMBA_PORT, 139) .waitingFor(Wait.forListeningPort()) - .withLogConsumer(new Slf4jLogConsumer(LOGGER)) + .withLogConsumer(new Slf4jLogConsumer(logger)) .withCommand("-w domain -u username;password -s share;/folder;;no;no;username;;; -p"); @BeforeEach @@ -64,63 +57,33 @@ public class SambaTestContainers { sambaContainer.stop(); } - protected SmbjClientProviderService configureSmbClient(final TestRunner testRunner, final boolean shouldEnableSmbClient) throws Exception { + protected SmbjClientProviderService configureSmbClient(TestRunner testRunner, boolean shouldEnableSmbClient) + throws Exception { final SmbjClientProviderService smbjClientProviderService = new SmbjClientProviderService(); testRunner.addControllerService("client-provider", smbjClientProviderService); - testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "client-provider"); testRunner.setProperty(smbjClientProviderService, HOSTNAME, sambaContainer.getHost()); - testRunner.setProperty(smbjClientProviderService, PORT, String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT))); + testRunner.setProperty(smbjClientProviderService, PORT, + String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT))); testRunner.setProperty(smbjClientProviderService, USERNAME, "username"); testRunner.setProperty(smbjClientProviderService, PASSWORD, "password"); testRunner.setProperty(smbjClientProviderService, SHARE, "share"); testRunner.setProperty(smbjClientProviderService, DOMAIN, "domain"); - if (shouldEnableSmbClient) { testRunner.enableControllerService(smbjClientProviderService); } - return smbjClientProviderService; } - protected String generateContentWithSize(final int sizeInBytes) { - final byte[] bytes = new byte[sizeInBytes]; + protected String generateContentWithSize(int sizeInBytes) { + byte[] bytes = new byte[sizeInBytes]; fill(bytes, (byte) 1); return new String(bytes); } - protected void createDirectory(final String path) { - createDirectory(path, AccessMode.READ_ONLY); - } - - protected void createDirectory(final String path, final AccessMode accessMode) { - final String dirMode = accessMode == AccessMode.READ_ONLY ? "755" : "777"; - try { - sambaContainer.execInContainer("bash", "-c", "mkdir -m " + dirMode + " -p " + getContainerPath(path)); - } catch (Exception e) { - throw new RuntimeException("Failed to create directory", e); - } - } - - protected void writeFile(final String path, final String content) { - writeFile(path, content, AccessMode.READ_ONLY); - } - - protected void writeFile(final String path, final String content, final AccessMode accessMode) { - final int fileMode = accessMode == AccessMode.READ_ONLY ? 0100644: 0100666; - sambaContainer.copyFileToContainer(Transferable.of(content, fileMode), getContainerPath(path)); - } - - protected boolean fileExists(final String path) { - try { - return sambaContainer.execInContainer("bash", "-c", "cat " + getContainerPath(path) + " > /dev/null").getExitCode() == 0; - } catch (Exception e) { - throw new RuntimeException("Failed to check file", e); - } - } - - private String getContainerPath(final String path) { - return "/folder/" + path; + protected void writeFile(String path, String content) { + String containerPath = "/folder/" + path; + sambaContainer.copyFileToContainer(Transferable.of(content), containerPath); } } diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java index 4e04bf85d1..ae9307ea64 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java @@ -20,7 +20,6 @@ import static java.util.Arrays.asList; import static java.util.stream.StreamSupport.stream; import com.hierynomus.msdtyp.AccessMask; -import com.hierynomus.mserref.NtStatus; import com.hierynomus.msfscc.FileAttributes; import com.hierynomus.msfscc.fileinformation.FileIdBothDirectoryInformation; import com.hierynomus.mssmb2.SMB2CreateDisposition; @@ -46,13 +45,13 @@ class SmbjClientService implements SmbClientService { private final static Logger LOGGER = LoggerFactory.getLogger(SmbjClientService.class); private static final List<String> SPECIAL_DIRECTORIES = asList(".", ".."); - private static final long UNCATEGORIZED_ERROR = -1L; + private static final long UNCATEGORISED_ERROR = -1L; private final Session session; private final DiskShare share; private final URI serviceLocation; - SmbjClientService(final Session session, final DiskShare share, final URI serviceLocation) { + SmbjClientService(Session session, DiskShare share, URI serviceLocation) { this.session = session; this.share = share; this.serviceLocation = serviceLocation; @@ -70,89 +69,50 @@ class SmbjClientService implements SmbClientService { } @Override - public Stream<SmbListableEntity> listFiles(final String directoryPath) { - return Stream.of(directoryPath).flatMap(path -> { + public Stream<SmbListableEntity> listRemoteFiles(String filePath) { + return Stream.of(filePath).flatMap(path -> { final Directory directory = openDirectory(path); return stream(directory::spliterator, 0, false) .map(entity -> buildSmbListableEntity(entity, path, serviceLocation)) .filter(entity -> !specialDirectory(entity)) - .flatMap(listable -> listable.isDirectory() ? listFiles(listable.getPathWithName()) + .flatMap(listable -> listable.isDirectory() ? listRemoteFiles(listable.getPathWithName()) : Stream.of(listable)) .onClose(directory::close); }); } @Override - public void ensureDirectory(final String directoryPath) { - try { - final int lastDirectorySeparatorPosition = directoryPath.lastIndexOf("/"); - if (lastDirectorySeparatorPosition > 0) { - ensureDirectory(directoryPath.substring(0, lastDirectorySeparatorPosition)); - } - - if (!share.folderExists(directoryPath)) { - try { - share.mkdir(directoryPath); - } catch (SMBApiException e) { - if (e.getStatus() == NtStatus.STATUS_OBJECT_NAME_COLLISION) { - if (!share.folderExists(directoryPath)) { - throw e; - } - } - } - } - } catch (Exception e) { - throw wrapException(e); + public void createDirectory(String path) { + final int lastDirectorySeparatorPosition = path.lastIndexOf("/"); + if (lastDirectorySeparatorPosition > 0) { + createDirectory(path.substring(0, lastDirectorySeparatorPosition)); + } + if (!share.folderExists(path)) { + share.mkdir(path); } } @Override - public void readFile(final String filePath, final OutputStream outputStream) throws IOException { - try (File file = share.openFile( - filePath, + public void readFile(String fileName, OutputStream outputStream) throws IOException { + try (File f = share.openFile( + fileName, EnumSet.of(AccessMask.GENERIC_READ), EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL), EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ), SMB2CreateDisposition.FILE_OPEN, EnumSet.of(SMB2CreateOptions.FILE_SEQUENTIAL_ONLY)) ) { - file.read(outputStream); + f.read(outputStream); + } catch (SMBApiException a) { + throw new SmbException(a.getMessage(), a.getStatusCode(), a); } catch (Exception e) { - throw wrapException(e); + throw new SmbException(e.getMessage(), UNCATEGORISED_ERROR, e); } finally { outputStream.close(); } } - @Override - public void moveFile(final String filePath, final String directoryPath) { - try (File file = share.openFile( - filePath, - EnumSet.of(AccessMask.GENERIC_WRITE, AccessMask.DELETE), - EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL), - EnumSet.noneOf(SMB2ShareAccess.class), - SMB2CreateDisposition.FILE_OPEN, - EnumSet.of(SMB2CreateOptions.FILE_SEQUENTIAL_ONLY)) - ) { - final String[] parts = filePath.split("/"); - // rename operation on Windows requires \ (backslash) path separator - final String newFilePath = directoryPath.replace('/', '\\') + "\\" + parts[parts.length - 1]; - file.rename(newFilePath); - } catch (Exception e) { - throw wrapException(e); - } - } - - @Override - public void deleteFile(final String filePath) { - try { - share.rm(filePath); - } catch (Exception e) { - throw wrapException(e); - } - } - - private SmbListableEntity buildSmbListableEntity(final FileIdBothDirectoryInformation info, final String path, final URI serviceLocation) { + private SmbListableEntity buildSmbListableEntity(FileIdBothDirectoryInformation info, String path, URI serviceLocation) { return SmbListableEntity.builder() .setName(info.getFileName()) .setShortName(info.getShortName()) @@ -168,30 +128,25 @@ class SmbjClientService implements SmbClientService { .build(); } - private Directory openDirectory(final String path) { - return share.openDirectory( - path, - EnumSet.of(AccessMask.GENERIC_READ), - EnumSet.of(FileAttributes.FILE_ATTRIBUTE_DIRECTORY), - EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ), - SMB2CreateDisposition.FILE_OPEN, - EnumSet.of(SMB2CreateOptions.FILE_DIRECTORY_FILE) - ); + private Directory openDirectory(String path) { + try { + return share.openDirectory( + path, + EnumSet.of(AccessMask.GENERIC_READ), + EnumSet.of(FileAttributes.FILE_ATTRIBUTE_DIRECTORY), + EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ), + SMB2CreateDisposition.FILE_OPEN, + EnumSet.of(SMB2CreateOptions.FILE_DIRECTORY_FILE) + ); + } catch (SMBApiException s) { + throw new RuntimeException("Could not open directory " + path + " due to " + s.getMessage(), s); + } } - private boolean specialDirectory(final SmbListableEntity entity) { + private boolean specialDirectory(SmbListableEntity entity) { return SPECIAL_DIRECTORIES.contains(entity.getName()); } - private SmbException wrapException(final Exception e) { - if (e instanceof SmbException) { - return (SmbException) e; - } else { - final long errorCode = e instanceof SMBApiException ? ((SMBApiException) e).getStatusCode() : UNCATEGORIZED_ERROR; - return new SmbException(e.getMessage(), errorCode, e); - } - } - } diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java index e671510a2e..59278e54c5 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java @@ -132,7 +132,7 @@ public class SmbjClientServiceIT { sambaProxy.setConnectionCut(true); } - final Set<String> actual = s.listFiles("testDirectory") + final Set<String> actual = s.listRemoteFiles("testDirectory") .map(SmbListableEntity::getIdentifier) .collect(toSet()); diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java index e30038684a..7193aad091 100644 --- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java +++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java @@ -52,7 +52,7 @@ class SmbjClientServiceTest { when(share.fileExists("to")).thenReturn(false); when(share.fileExists("create")).thenReturn(false); - underTest.ensureDirectory("directory/path/to/create"); + underTest.createDirectory("directory/path/to/create"); verify(share).mkdir("directory/path"); verify(share).mkdir("directory/path/to");